package cn.sheep.violet.etl

import cn.sheep.violet.utils.NBUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/** 將bzip2日志格式转换成parquet文件格式
  * author: old sheep
  * QQ: 64341393 
  * Created 2018/10/15
  */
object Bz2Parquet {

    def main(args: Array[String]): Unit = {

        // 校验参数
        if (args.length != 2) {
            println(
                """
                  |Usage:
                  | cn.sheep.violet.etl.Bz2Parquet
                  | args:
                  |     dataInputPath:  原始日志输入路径
                  |     dataOutputPath: parquet存储路径
                """.stripMargin)
            sys.exit(101) // 101: 参数不合法 $?
        }

        // 模式匹配 case .... esac
        val Array(dataInputPath, dataOutputPath) = args

        val sparkConf = new SparkConf()
        sparkConf.setAppName("日志转parquet")
        sparkConf.setMaster("local[*]")
        // 设置spark程序采用的序列化方式
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

        val sc = new SparkContext(sparkConf)

        // 读取数据
        val rawData: RDD[String] = sc.textFile(dataInputPath)

        // 导入隐式转换
        import cn.sheep.violet.bean.SheepString._

        // RDD[String] => RDD[Row]
        val rowData: RDD[Row] = rawData
            .map(_.split(",", -1)).filter(_.length >= 85)
            .map(fields => {

                Row(
                    fields(0),
                    NBUtils.str2Int(fields(1)),
                    fields(2).toIntPlus,
                    fields(3).toIntPlus,
                    fields(4).toIntPlus,
                    fields(5),
                    fields(6),
                    fields(7).toIntPlus,
                    fields(8).toIntPlus,
                    fields(9).toDoublePlus,
                    fields(10).toDoublePlus,
                    fields(11),
                    fields(12),
                    fields(13),
                    fields(14),
                    fields(15),
                    fields(16),
                    fields(17).toIntPlus,
                    fields(18),
                    fields(19),
                    fields(20).toIntPlus,
                    fields(21).toIntPlus,
                    fields(22),
                    fields(23),
                    fields(24),
                    fields(25),
                    fields(26).toIntPlus,
                    fields(27),
                    fields(28).toIntPlus,
                    fields(29),
                    fields(30).toIntPlus,
                    fields(31).toIntPlus,
                    fields(32).toIntPlus,
                    fields(33),
                    fields(34).toIntPlus,
                    fields(35).toIntPlus,
                    fields(36).toIntPlus,
                    fields(37),
                    fields(38).toIntPlus,
                    fields(39).toIntPlus,
                    fields(40).toDoublePlus,
                    fields(41).toDoublePlus,
                    fields(42).toIntPlus,
                    fields(43),
                    fields(44).toDoublePlus,
                    fields(45).toDoublePlus,
                    fields(46),
                    fields(47),
                    fields(48),
                    fields(49),
                    fields(50),
                    fields(51),
                    fields(52),
                    fields(53),
                    fields(54),
                    fields(55),
                    fields(56),
                    fields(57).toIntPlus,
                    fields(58).toDoublePlus,
                    fields(59).toIntPlus,
                    fields(60).toIntPlus,
                    fields(61),
                    fields(62),
                    fields(63),
                    fields(64),
                    fields(65),
                    fields(66),
                    fields(67),
                    fields(68),
                    fields(69),
                    fields(70),
                    fields(71),
                    fields(72),
                    fields(73).toIntPlus,
                    fields(74).toDoublePlus,
                    fields(75).toDoublePlus,
                    fields(76).toDoublePlus,
                    fields(77).toDoublePlus,
                    fields(78).toDoublePlus,
                    fields(79),
                    fields(80),
                    fields(81),
                    fields(82),
                    fields(83),
                    fields(84).toIntPlus
                )
            })

        // 构建schema信息, 定义的是每个字段的类型(表的结构)
        val schema = StructType(
            Seq(
                StructField("sessionid", StringType),
                StructField("advertisersid", IntegerType),
                StructField("adorderid", IntegerType),
                StructField("adcreativeid", IntegerType),
                StructField("adplatformproviderid", IntegerType),
                StructField("sdkversion", StringType),
                StructField("adplatformkey", StringType),
                StructField("putinmodeltype", IntegerType),
                StructField("requestmode", IntegerType),
                StructField("adprice", DoubleType),
                StructField("adppprice", DoubleType),
                StructField("requestdate", StringType),
                StructField("ip", StringType),
                StructField("appid", StringType),
                StructField("appname", StringType),
                StructField("uuid", StringType),
                StructField("device", StringType),
                StructField("client", IntegerType),
                StructField("osversion", StringType),
                StructField("density", StringType),
                StructField("pw", IntegerType),
                StructField("ph", IntegerType),
                StructField("long", StringType),
                StructField("lat", StringType),
                StructField("provincename", StringType),
                StructField("cityname", StringType),
                StructField("ispid", IntegerType),
                StructField("ispname", StringType),
                StructField("networkmannerid", IntegerType),
                StructField("networkmannername", StringType),
                StructField("iseffective", IntegerType),
                StructField("isbilling", IntegerType),
                StructField("adspacetype", IntegerType),
                StructField("adspacetypename", StringType),
                StructField("devicetype", IntegerType),
                StructField("processnode", IntegerType),
                StructField("apptype", IntegerType),
                StructField("district", StringType),
                StructField("paymode", IntegerType),
                StructField("isbid", IntegerType),
                StructField("bidprice", DoubleType),
                StructField("winprice", DoubleType),
                StructField("iswin", IntegerType),
                StructField("cur", StringType),
                StructField("rate", DoubleType),
                StructField("cnywinprice", DoubleType),
                StructField("imei", StringType),
                StructField("mac", StringType),
                StructField("idfa", StringType),
                StructField("openudid", StringType),
                StructField("androidid", StringType),
                StructField("rtbprovince", StringType),
                StructField("rtbcity", StringType),
                StructField("rtbdistrict", StringType),
                StructField("rtbstreet", StringType),
                StructField("storeurl", StringType),
                StructField("realip", StringType),
                StructField("isqualityapp", IntegerType),
                StructField("bidfloor", DoubleType),
                StructField("aw", IntegerType),
                StructField("ah", IntegerType),
                StructField("imeimd5", StringType),
                StructField("macmd5", StringType),
                StructField("idfamd5", StringType),
                StructField("openudidmd5", StringType),
                StructField("androididmd5", StringType),
                StructField("imeisha1", StringType),
                StructField("macsha1", StringType),
                StructField("idfasha1", StringType),
                StructField("openudidsha1", StringType),
                StructField("androididsha1", StringType),
                StructField("uuidunknow", StringType),
                StructField("userid", StringType),
                StructField("iptype", IntegerType),
                StructField("initbidprice", DoubleType),
                StructField("adpayment", DoubleType),
                StructField("agentrate", DoubleType),
                StructField("lrate", DoubleType),
                StructField("adxrate", DoubleType),
                StructField("title", StringType),
                StructField("keywords", StringType),
                StructField("tagid", StringType),
                StructField("callbackdate", StringType),
                StructField("channelid", StringType),
                StructField("mediatype", IntegerType)
            )
        )

        // 处理数据
        val sQLContext = new SQLContext(sc)
        // 设置parquet文件的压缩格式
        sQLContext.setConf("spark.sql.parquet.compression.codec", "snappy")
        val dataFrame = sQLContext.createDataFrame(rowData, schema)


        // 判断输出路径是否已存在，如果存在删除
        /**
          * fs:
          *     是本地文件系统还是分布式文件系统，取决于客户端中的fs.defaultFS的参数
          *         file:///            本地文件系统
          *         hdfs://host:9000/   分布式文件系统
          */
        val fs = FileSystem.get(sc.hadoopConfiguration)

        val path = new Path(dataOutputPath)
        if (fs.exists(path)) {
            fs.delete(path, true)
        }

        // 存数据 parquet
        dataFrame.write/*.partitionBy("provincename", "cityname")*/.parquet(dataOutputPath)

        // 释放
        sc.stop()
    }

}
